package fun.easycode.datastream;

import cn.hutool.core.util.IdUtil;
import fun.easycode.jointblock.util.JacksonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

/**
 * 每一个processor对应一个sink，会以监听的方式注册到canal中
 *  当数据符合sink监听的表时，数据会被传入sink中,然后sink会将数据发送到消息队列中
 * @author xuzhen97
 */
@Slf4j
public class DataEntryMQSink<T> implements CanalListener<T>{
    private final IDataProcessor<T> processor;
    private final DataStreamProperties properties;
    private final DefaultMQProducer producer;


    public DataEntryMQSink(IDataProcessor<T> processor, DataStreamProperties properties) {
        this.processor = processor;
        this.properties = properties;
        producer = createProducer();
    }

    /**
     * 启动生产者
     * @throws MQClientException 启动异常
     */
    public void start() throws MQClientException {
        producer.start();
    }

    /**
     * 发送数据
     * @param dataEntry 数据
     */
    public void sink(DataEntry<T> dataEntry){
        // 最终会被输出的报表，这里消息要以具体的报表为维度进行一些处理
        // 比如如果队列充足的情况下，一个报表一个队列，如果队列不足，都会堆积在0的队列中
        String outputTableName = processor.getParam().getOutputTableName();
        // TODO key 业务上的id, 后续可以和日志绑定方便排查问题
        // 也可以进一步抽象但是增加工作量
        String key = IdUtil.getSnowflakeNextIdStr();
        // 消息体
        Message message = new Message(properties.getRocketMq().getDefaultTopic()
                , processor.getMark(), key, JacksonUtil.toJson(dataEntry).getBytes());
        try {
            // 发送顺序消息
            producer.send(message, new MessageQueueSelectorImpl(), outputTableName);
            log.info(" send message to mq success, key: {} quickEntry {}", key, JacksonUtil.toJson(dataEntry));
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 创建生产者
     * @return 生产者
     */
    private DefaultMQProducer createProducer(){
        // 因为一个处理器对应一个sink, 所以这里的生产者名称就是处理器的名称
        DefaultMQProducer producer = new DefaultMQProducer(processor.getMark());
        producer.setNamesrvAddr(properties.getRocketMq().getNameSrvAddr());
        return producer;
    }

    @Override
    public void onMessage(DataEntry<T> entry) {
        sink(entry);
    }

    @Override
    public String getTableName() {
        return DataContext.getTableName(processor);
    }

    @Override
    public Class<T> getTargetClass() {
        return processor.getParam().getInputClass();
    }

    @Override
    public String getMark() {
        return processor.getMark();
    }
}
